/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/*********************************************************************
 *   $Id: we_bulkloadbuffer.cpp 4661 2013-06-04 12:59:50Z dcathey $
 *
 ********************************************************************/

#include <sys/time.h>
#include <sstream>
#include <string>
#include <stdint.h>
#include <cerrno>
#include <cstring>
#include <cstdlib>  // includes <alloca.h> on linux
#include <cmath>
#include <ctype.h>
#include <cfloat>

#include "we_bulkload.h"
#include "we_bulkloadbuffer.h"
#include "we_brm.h"
#include "we_convertor.h"
#include "we_log.h"
#include "brmtypes.h"
#include "dataconvert.h"
#include "exceptclasses.h"
#include "mcs_decimal.h"

#include "joblisttypes.h"

using namespace std;
using namespace boost;
using namespace execplan;


namespace
{
const std::string INPUT_ERROR_WRONG_NO_COLUMNS = "Data contains wrong number of columns";
const std::string INPUT_ERROR_TOO_LONG = "Data in wrong format; exceeds max field length; ";
const std::string INPUT_ERROR_NULL_CONSTRAINT = "Data violates NOT NULL constraint with no default";
const std::string INPUT_ERROR_ODD_VARBINARY_LENGTH = "VarBinary column is incomplete; odd number of bytes; ";
const std::string INPUT_ERROR_STRING_TOO_LONG = "Character data exceeds max field length; ";
const char NULL_CHAR = 'N';
const char* NULL_VALUE_STRING = "NULL";
const char NULL_AUTO_INC_0 = '0';
const unsigned long long NULL_AUTO_INC_0_BINARY = 0;
const char NEWLINE_CHAR = '\n';

// Enumeration states related to parsing a column value
enum FieldParsingState
{
  FLD_PARSE_LEADING_CHAR_STATE = 1,   // parsing leading character
  FLD_PARSE_ENCLOSED_STATE = 2,       // parsing an enclosed column value
  FLD_PARSE_TRAILING_CHAR_STATE = 3,  // parsing bytes after an
  //   enclosed column value
  FLD_PARSE_NORMAL_STATE = 4  // parsing non-enclosed column value
};

//------------------------------------------------------------------------------
// Expand pRowData to size "newArrayCapacity", preserving contents, and
// deleting old pointer
//------------------------------------------------------------------------------
inline void resizeRowDataArray(char** pRowData, unsigned int dataLength, unsigned int newArrayCapacity)
{
  char* tmpRaw = new char[newArrayCapacity];
  memcpy(tmpRaw, *pRowData, dataLength);
  delete[] * pRowData;
  *pRowData = tmpRaw;
}

}  // namespace

//#define DEBUG_TOKEN_PARSING 1

namespace WriteEngine
{
//------------------------------------------------------------------------------
// BulkLoadBuffer constructor
//------------------------------------------------------------------------------
BulkLoadBuffer::BulkLoadBuffer(unsigned numberOfCols, unsigned bufferSize, Log* logger, int bufferId,
                               const std::string& tableName, const JobFieldRefList& jobFieldRefList)
 : fOverflowSize(0)
 , fParseComplete(0)
 , fTotalRows(0)
 , fStartRow(0)
 , fStartRowForLogging(0)
 , fAutoIncGenCount(0)
 , fAutoIncNextValue(0)
 , fReadSize(0)
 , fLog(logger)
 , fNullStringMode(false)
 , fEnclosedByChar('\0')
 , fEscapeChar('\\')
 , fBufferId(bufferId)
 , fTableName(tableName)
 , fbTruncationAsError(false)
 , fImportDataMode(IMPORT_DATA_TEXT)
 , fTimeZone(dataconvert::systemTimeZoneOffset())
 , fFixedBinaryRecLen(0)
{
  fData = new char[bufferSize];
  fOverflowBuf = NULL;
  fStatusBLB = WriteEngine::NEW;
  fNumberOfColumns = numberOfCols;
  fBufferSize = bufferSize;

  fColumnLocks.clear();

  fTokens = 0;

  fRowStatus.clear();
  fErrRows.clear();

  struct LockInfo info;
  info.locker = -1;
  info.status = WriteEngine::NEW;

  fColumnLocks.resize(numberOfCols);
  fColumnLocks.assign(fNumberOfColumns, info);

  fTotalReadRowsParser = 0;
  fStartRowParser = 0;
  fDataParser = 0;
  fTokensParser = 0;
  fStartRowForLoggingParser = 0;
  fAutoIncGenCountParser = 0;
  fNumFieldsInFile = 0;
  fNumColsInFile = 0;

  // Count the total number of fields in the input file (fNumFieldsInFile)
  // and the number of db columns that will be loaded from those fields
  // (fNumColsInFile).  Keep in mind that fNumColsInFile may be less than
  // fNumFieldsInFile, because there may be fields we are to ignore, and/or
  // some db columns may get default loaded without a corresponding field
  // in the input file.
  fFieldList.resize(jobFieldRefList.size());

  for (unsigned k = 0; k < jobFieldRefList.size(); k++)
  {
    fFieldList[k] = jobFieldRefList[k];

    switch (jobFieldRefList[k].fFldColType)
    {
      case BULK_FLDCOL_COLUMN_FIELD:
      {
        fNumColsInFile++;
        fNumFieldsInFile++;
        break;
      }

      case BULK_FLDCOL_IGNORE_FIELD:
      {
        fNumFieldsInFile++;
        break;
      }

      case BULK_FLDCOL_COLUMN_DEFAULT:
      default:
      {
        break;
      }
    }
  }
}

//------------------------------------------------------------------------------
// BulkLoadBuffer destructor
//------------------------------------------------------------------------------
BulkLoadBuffer::~BulkLoadBuffer()
{
  if (fData != NULL)
    delete[] fData;

  if (fOverflowBuf != NULL)
    delete[] fOverflowBuf;

  fColumnLocks.clear();

  if (fTokens != NULL)
  {
    for (unsigned int i = 0; i < fTotalRows; ++i)
    {
      delete[] fTokens[i];
    }

    delete[] fTokens;
  }

  fRowStatus.clear();
  fErrRows.clear();
}

//------------------------------------------------------------------------------
// Resets state of buffer.
//------------------------------------------------------------------------------
void BulkLoadBuffer::reset()
{
  fStartRow = fTotalReadRows = fTotalReadRowsForLog = 0;
  fAutoIncGenCount = 0;
}

//------------------------------------------------------------------------------
// Resets state of buffer's column locks.
//------------------------------------------------------------------------------
void BulkLoadBuffer::resetColumnLocks()
{
  fParseComplete = 0;

  struct LockInfo info;
  fColumnLocks.assign(fNumberOfColumns, info);
}

//------------------------------------------------------------------------------
// Copy overflow leftover from previous buffer into the start of "this" buffer
//------------------------------------------------------------------------------
void BulkLoadBuffer::copyOverflow(const BulkLoadBuffer& buffer)
{
  if (fOverflowBuf != NULL)
  {
    delete[] fOverflowBuf;
    fOverflowBuf = NULL;
  }

  fOverflowSize = buffer.fOverflowSize;

  if (fOverflowSize != 0)
  {
    fOverflowBuf = new char[buffer.fOverflowSize];
    memcpy(fOverflowBuf, buffer.fOverflowBuf, buffer.fOverflowSize);
  }
}

//------------------------------------------------------------------------------
// Parse/convert the given "field" value based on the specified length and type.
// field        (in)     - the input field value to be parsed
// fieldLength  (in)     - number of bytes of data in "field"
// nullFlag     (in)     - indicates if NULL value is to be assigned to "output"
//                         rather than parsing the data in "field"
// output       (out)    - the parsed value taken from "field"
// column       (in)     - column information for the column we are parsing
// bufStats:
// minBufferVal (in/out) - ongoing min value for the Read buffer we are parsing
// maxBufferVal (in/out) - ongoing max value for the Read buffer we are parsing
// satCount     (in/out) - ongoing saturation row count for buffer being parsed
//------------------------------------------------------------------------------
void BulkLoadBuffer::convert(char* field, int fieldLength, bool nullFlag, unsigned char* output,
                             const JobColumn& column, BLBufferStats& bufStats)
{
  char biVal;
  int iVal;
  float fVal;
  double dVal;
  short siVal;
  void* pVal;
  int32_t iDate;
  char charTmpBuf[MAX_COLUMN_BOUNDARY + 1] = {0};
  long long llVal = 0, llDate = 0;
  int128_t bigllVal = 0;
  uint64_t tmp64;
  uint32_t tmp32;
  uint8_t ubiVal;
  uint16_t usiVal;
  uint32_t uiVal;
  uint64_t ullVal;

  int width = column.width;

  //--------------------------------------------------------------------------
  // Parse based on column data type
  //--------------------------------------------------------------------------
  switch (column.weType)
  {
    //----------------------------------------------------------------------
    // FLOAT
    //----------------------------------------------------------------------
    case WriteEngine::WR_FLOAT:
    {
      if (nullFlag)
      {
        if (column.fWithDefault)
        {
          fVal = column.fDefaultDbl;
          pVal = &fVal;
        }
        else
        {
          tmp32 = joblist::FLOATNULL;
          pVal = &tmp32;
        }
      }
      else
      {
        float minFltSat = column.fMinDblSat;
        float maxFltSat = column.fMaxDblSat;

        if (fImportDataMode != IMPORT_DATA_TEXT)
        {
          memcpy(&fVal, field, sizeof(fVal));

          if (isnan(fVal))
          {
            if (signbit(fVal))
              fVal = minFltSat;
            else
              fVal = maxFltSat;

            bufStats.satCount++;
          }
          else
          {
            if (fVal > maxFltSat)
            {
              fVal = maxFltSat;
              bufStats.satCount++;
            }
            else if (fVal < minFltSat)
            {
              fVal = minFltSat;
              bufStats.satCount++;
            }
          }
        }
        else
        {
          errno = 0;

          fVal = strtof(field, 0);

          if (errno == ERANGE)
          {
            if (abs(fVal) == HUGE_VALF)
            {
              if (fVal > 0)
                fVal = maxFltSat;
              else
                fVal = minFltSat;

              bufStats.satCount++;
            }
            else
              fVal = 0;
          }
          else
          {
            if (fVal > maxFltSat)
            {
              fVal = maxFltSat;
              bufStats.satCount++;
            }
            else if (fVal < minFltSat)
            {
              fVal = minFltSat;
              bufStats.satCount++;
            }
            if (fVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
            {
              fVal = 1;
            }
          }
        }

        pVal = &fVal;
      }

      break;
    }

    //----------------------------------------------------------------------
    // DOUBLE
    //----------------------------------------------------------------------
    case WriteEngine::WR_DOUBLE:
    {
      if (nullFlag)
      {
        if (column.fWithDefault)
        {
          dVal = column.fDefaultDbl;
          pVal = &dVal;
        }
        else
        {
          tmp64 = joblist::DOUBLENULL;
          pVal = &tmp64;
        }
      }
      else
      {
        if (fImportDataMode != IMPORT_DATA_TEXT)
        {
          memcpy(&dVal, field, sizeof(dVal));

          if (std::isnan(dVal))
          {
            if (signbit(dVal))
              dVal = column.fMinDblSat;
            else
              dVal = column.fMaxDblSat;

            bufStats.satCount++;
          }
          else
          {
            if (dVal > column.fMaxDblSat)
            {
              dVal = column.fMaxDblSat;
              bufStats.satCount++;
            }
            else if (dVal < column.fMinDblSat)
            {
              dVal = column.fMinDblSat;
              bufStats.satCount++;
            }
          }
        }
        else
        {
          errno = 0;

          dVal = strtod(field, 0);

          if (errno == ERANGE)
          {
            if (abs(dVal) == HUGE_VALL)
            {
              if (dVal > 0)
                dVal = column.fMaxDblSat;
              else
                dVal = column.fMinDblSat;

              bufStats.satCount++;
            }
            else
              dVal = 0;
          }
          else
          {
            if (dVal > column.fMaxDblSat)
            {
              dVal = column.fMaxDblSat;
              bufStats.satCount++;
            }
            else if (dVal < column.fMinDblSat)
            {
              dVal = column.fMinDblSat;
              bufStats.satCount++;
            }
            else if (dVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
            {
              dVal = 1;
            }
          }
        }

        pVal = &dVal;
      }

      break;
    }

    //----------------------------------------------------------------------
    // CHARACTER
    //----------------------------------------------------------------------
    case WriteEngine::WR_CHAR:
    {
      if (nullFlag)
      {
        if (column.fWithDefault)
        {
          int defLen = column.fDefaultChr.length();
          const char* defData = column.fDefaultChr.str();

          if (defLen > column.definedWidth)
            memcpy(charTmpBuf, defData, column.definedWidth);
          else
            memcpy(charTmpBuf, defData, defLen);

          // fall through to update saturation and min/max
        }
        else
        {
          idbassert(width <= 8);

          for (int i = 0; i < width - 1; i++)
          {
            charTmpBuf[i] = '\377';
          }

          charTmpBuf[width - 1] = '\376';

          pVal = charTmpBuf;
          break;
        }
      }
      else
      {
        // truncate string if it is too long
        // @Bug 3040.  Use definedWidth for the data truncation to keep
        // from storing characters beyond the column's defined width.
        // It contains the column definition width rather than the bytes
        // on disk (e.g. 5 for a varchar(5) instead of 8).
        if (column.cs->mbmaxlen > 1)
        {
          const CHARSET_INFO* cs = column.cs;
          const char* start = (const char*) field;
          const char* end = (const char*)(field + fieldLength);
          size_t numChars = cs->numchars(start, end);
          size_t maxCharLength = column.definedWidth / cs->mbmaxlen;

          if (numChars > maxCharLength)
          {
            MY_STRCOPY_STATUS status;
            cs->well_formed_char_length(start, end, maxCharLength, &status);
            fieldLength = status.m_source_end_pos - start;
            bufStats.satCount++;
          }
        }
        else // cs->mbmaxlen == 1
        {
          if (fieldLength > column.definedWidth)
          {
            fieldLength = column.definedWidth;
            bufStats.satCount++;
          }
        }

        memcpy(charTmpBuf, field, fieldLength);
      }

      // Swap byte order before comparing character string
      // Compare must be unsigned
      uint64_t compChar = uint64ToStr(*(reinterpret_cast<uint64_t*>(charTmpBuf)));
      int64_t binChar = static_cast<int64_t>(compChar);

      // Update min/max range
      uint64_t minVal = static_cast<uint64_t>(bufStats.minBufferVal);
      uint64_t maxVal = static_cast<uint64_t>(bufStats.maxBufferVal);
      if (compChar < minVal)
        bufStats.minBufferVal = binChar;
      if (compChar > maxVal)
        bufStats.maxBufferVal = binChar;

      pVal = charTmpBuf;
      // cout << "In convert: fieldLength = " << fieldLength <<endl;
      break;
    }

    //----------------------------------------------------------------------
    // SHORT INT
    //----------------------------------------------------------------------
    case WriteEngine::WR_SHORT:
    {
      long long origVal;
      bool bSatVal = false;

      if (nullFlag)
      {
        if (!column.autoIncFlag)
        {
          if (column.fWithDefault)
          {
            origVal = column.fDefaultInt;
            // fall through to update saturation and min/max
          }
          else
          {
            siVal = joblist::SMALLINTNULL;
            pVal = &siVal;
            break;
          }
        }
        else
        {
          origVal = fAutoIncNextValue++;
        }
      }
      else
      {
        if (fImportDataMode != IMPORT_DATA_TEXT)
        {
          short int siVal2;
          memcpy(&siVal2, field, sizeof(siVal2));
          origVal = siVal2;
        }
        else
        {
          if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
              (column.dataType == CalpontSystemCatalog::UDECIMAL))
          {
            // errno is initialized and set in convertDecimalString
            origVal = Convertor::convertDecimalString(field, fieldLength, column.scale);
          }
          else
          {
            errno = 0;
            origVal = strtol(field, 0, 10);
          }

          if (errno == ERANGE)
            bSatVal = true;
        }
      }

      // Saturate the value
      if (origVal < column.fMinIntSat)
      {
        origVal = column.fMinIntSat;
        bSatVal = true;
      }
      else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
      {
        origVal = static_cast<int64_t>(column.fMaxIntSat);
        bSatVal = true;
      }
      else if (origVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
      {
        origVal = 1;
      }

      if (bSatVal)
        bufStats.satCount++;

      // Update min/max range
      if (origVal < bufStats.minBufferVal)
        bufStats.minBufferVal = origVal;

      if (origVal > bufStats.maxBufferVal)
        bufStats.maxBufferVal = origVal;

      siVal = origVal;
      pVal = &siVal;

      break;
    }

    //----------------------------------------------------------------------
    // UNSIGNED SHORT INT
    //----------------------------------------------------------------------
    case WriteEngine::WR_USHORT:
    {
      int64_t origVal = 0;
      bool bSatVal = false;

      if (nullFlag)
      {
        if (!column.autoIncFlag)
        {
          if (column.fWithDefault)
          {
            origVal = static_cast<int64_t>(column.fDefaultUInt);
            // fall through to update saturation and min/max
          }
          else
          {
            usiVal = joblist::USMALLINTNULL;
            pVal = &usiVal;
            break;
          }
        }
        else
        {
          origVal = fAutoIncNextValue++;
        }
      }
      else
      {
        if (fImportDataMode != IMPORT_DATA_TEXT)
        {
          unsigned short int siVal2;
          memcpy(&siVal2, field, sizeof(siVal2));
          origVal = siVal2;
        }
        else
        {
          errno = 0;

          origVal = strtoll(field, 0, 10);

          if (errno == ERANGE)
            bSatVal = true;
        }
      }

      // Saturate the value (saturates any negative value to 0)
      if (origVal < column.fMinIntSat)
      {
        origVal = column.fMinIntSat;
        bSatVal = true;
      }
      else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
      {
        origVal = static_cast<int64_t>(column.fMaxIntSat);
        bSatVal = true;
      }
      else if (origVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
      {
        origVal = 1;
      }

      if (bSatVal)
        bufStats.satCount++;

      // Update min/max range
      uint64_t uVal = origVal;

      if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
        bufStats.minBufferVal = origVal;

      if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
        bufStats.maxBufferVal = origVal;

      usiVal = origVal;
      pVal = &usiVal;

      break;
    }

    //----------------------------------------------------------------------
    // TINY INT
    //----------------------------------------------------------------------
    case WriteEngine::WR_BYTE:
    {
      long long origVal;
      bool bSatVal = false;

      if (nullFlag)
      {
        if (!column.autoIncFlag)
        {
          if (column.fWithDefault)
          {
            origVal = column.fDefaultInt;
            // fall through to update saturation and min/max
          }
          else
          {
            biVal = joblist::TINYINTNULL;
            pVal = &biVal;
            break;
          }
        }
        else
        {
          origVal = fAutoIncNextValue++;
        }
      }
      else
      {
        if (fImportDataMode != IMPORT_DATA_TEXT)
        {
          char biVal2;
          memcpy(&biVal2, field, sizeof(biVal2));
          origVal = biVal2;
        }
        else
        {
          if (isTrueWord(const_cast<const char*>(field), fieldLength))
          {
            strcpy(field, "1");
            fieldLength = 1;
          }

          if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
              (column.dataType == CalpontSystemCatalog::UDECIMAL))
          {
            // errno is initialized and set in convertDecimalString
            origVal = Convertor::convertDecimalString(field, fieldLength, column.scale);
          }
          else
          {
            errno = 0;
            origVal = strtol(field, 0, 10);
          }

          if (errno == ERANGE)
            bSatVal = true;
        }
      }

      // Saturate the value
      if (origVal < column.fMinIntSat)
      {
        origVal = column.fMinIntSat;
        bSatVal = true;
      }
      else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
      {
        origVal = static_cast<int64_t>(column.fMaxIntSat);
        bSatVal = true;
      }

      if (bSatVal)
        bufStats.satCount++;

      // Update min/max range
      if (origVal < bufStats.minBufferVal)
        bufStats.minBufferVal = origVal;

      if (origVal > bufStats.maxBufferVal)
        bufStats.maxBufferVal = origVal;

      biVal = origVal;
      pVal = &biVal;

      break;
    }

    //----------------------------------------------------------------------
    // UNSIGNED TINY INT
    //----------------------------------------------------------------------
    case WriteEngine::WR_UBYTE:
    {
      int64_t origVal = 0;
      bool bSatVal = false;

      if (nullFlag)
      {
        if (!column.autoIncFlag)
        {
          if (column.fWithDefault)
          {
            origVal = static_cast<int64_t>(column.fDefaultUInt);
            // fall through to update saturation and min/max
          }
          else
          {
            ubiVal = joblist::UTINYINTNULL;
            pVal = &ubiVal;
            break;
          }
        }
        else
        {
          origVal = fAutoIncNextValue++;
        }
      }
      else
      {
        if (fImportDataMode != IMPORT_DATA_TEXT)
        {
          uint8_t biVal2;
          memcpy(&biVal2, field, sizeof(biVal2));
          origVal = biVal2;
        }
        else
        {
          errno = 0;

          origVal = strtoll(field, 0, 10);

          if (errno == ERANGE)
            bSatVal = true;
        }
      }

      // Saturate the value (saturates any negative value to 0)
      if (origVal < column.fMinIntSat)
      {
        origVal = column.fMinIntSat;
        bSatVal = true;
      }
      else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
      {
        origVal = static_cast<int64_t>(column.fMaxIntSat);
        bSatVal = true;
      }
      else if (origVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
      {
        origVal = 1;
      }

      if (bSatVal)
        bufStats.satCount++;

      // Update min/max range
      uint64_t uVal = origVal;

      if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
        bufStats.minBufferVal = origVal;

      if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
        bufStats.maxBufferVal = origVal;

      ubiVal = origVal;
      pVal = &ubiVal;

      break;
    }

    //----------------------------------------------------------------------
    // BIG INT
    //----------------------------------------------------------------------
    case WriteEngine::WR_LONGLONG:
    {
      bool bSatVal = false;

      if (column.dataType != CalpontSystemCatalog::DATETIME &&
          column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIME)
      {
        if (nullFlag)
        {
          if (!column.autoIncFlag)
          {
            if (column.fWithDefault)
            {
              llVal = column.fDefaultInt;
              // fall through to update saturation and min/max
            }
            else
            {
              llVal = joblist::BIGINTNULL;
              pVal = &llVal;
              break;
            }
          }
          else
          {
            llVal = fAutoIncNextValue++;
          }
        }
        else
        {
          if (fImportDataMode != IMPORT_DATA_TEXT)
          {
            memcpy(&llVal, field, sizeof(llVal));
          }
          else
          {
            if (isTrueWord(const_cast<const char*>(field), fieldLength))
            {
              strcpy(field, "1");
              fieldLength = 1;
            }

            if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
                (column.dataType == CalpontSystemCatalog::UDECIMAL))
            {
              // errno is initialized and set in convertDecimalString
              llVal = Convertor::convertDecimalString(field, fieldLength, column.scale);
            }
            else
            {
              errno = 0;
              llVal = strtoll(field, 0, 10);
            }
          }

          if (errno == ERANGE)
            bSatVal = true;
        }

        // Saturate the value
        if (llVal < column.fMinIntSat)
        {
          llVal = column.fMinIntSat;
          bSatVal = true;
        }
        else if (llVal > static_cast<int64_t>(column.fMaxIntSat))
        {
          // llVal can be > fMaxIntSat if this is a decimal column
          llVal = static_cast<int64_t>(column.fMaxIntSat);
          bSatVal = true;
        }

        if (bSatVal)
          bufStats.satCount++;

        // Update min/max range
        if (llVal < bufStats.minBufferVal)
          bufStats.minBufferVal = llVal;

        if (llVal > bufStats.maxBufferVal)
          bufStats.maxBufferVal = llVal;

        pVal = &llVal;
      }
      else if (column.dataType == CalpontSystemCatalog::TIME)
      {
        // time conversion
        int rc = 0;

        if (nullFlag)
        {
          if (column.fWithDefault)
          {
            llDate = column.fDefaultInt;
            // fall through to update saturation and min/max
          }
          else
          {
            llDate = joblist::TIMENULL;
            pVal = &llDate;
            break;
          }
        }
        else
        {
          if (fImportDataMode != IMPORT_DATA_TEXT)
          {
            memcpy(&llDate, field, sizeof(llDate));

            if (!dataconvert::DataConvert::isColumnTimeValid(llDate))
              rc = -1;
          }
          else
          {
            llDate = dataconvert::DataConvert::convertColumnTime(field, dataconvert::CALPONTTIME_ENUM, rc,
                                                                 fieldLength);
          }
        }

        if (rc == 0)
        {
          if (llDate < bufStats.minBufferVal)
            bufStats.minBufferVal = llDate;

          if (llDate > bufStats.maxBufferVal)
            bufStats.maxBufferVal = llDate;
        }
        else
        {
          bufStats.satCount++;
        }

        pVal = &llDate;
      }
      else if (column.dataType == CalpontSystemCatalog::TIMESTAMP)
      {
        // timestamp conversion
        int rc = 0;

        if (nullFlag)
        {
          if (column.fWithDefault)
          {
            llDate = column.fDefaultInt;
            // fall through to update saturation and min/max
          }
          else
          {
            llDate = joblist::TIMESTAMPNULL;
            pVal = &llDate;
            break;
          }
        }
        else
        {
          if (fImportDataMode != IMPORT_DATA_TEXT)
          {
            memcpy(&llDate, field, sizeof(llDate));

            if (!dataconvert::DataConvert::isColumnTimeStampValid(llDate))
              rc = -1;
          }
          else
          {
            llDate = dataconvert::DataConvert::convertColumnTimestamp(
                field, dataconvert::CALPONTDATETIME_ENUM, rc, fieldLength, fTimeZone);
          }
        }

        if (rc == 0)
        {
          if (llDate < bufStats.minBufferVal)
            bufStats.minBufferVal = llDate;

          if (llDate > bufStats.maxBufferVal)
            bufStats.maxBufferVal = llDate;
        }
        else
        {
          llDate = 0;
          bufStats.satCount++;
        }

        pVal = &llDate;
      }
      else
      {
        // datetime conversion
        int rc = 0;

        if (nullFlag)
        {
          if (column.fWithDefault)
          {
            llDate = column.fDefaultInt;
            // fall through to update saturation and min/max
          }
          else
          {
            llDate = joblist::DATETIMENULL;
            pVal = &llDate;
            break;
          }
        }
        else
        {
          if (fImportDataMode != IMPORT_DATA_TEXT)
          {
            memcpy(&llDate, field, sizeof(llDate));

            if (!dataconvert::DataConvert::isColumnDateTimeValid(llDate))
              rc = -1;
          }
          else
          {
            llDate = dataconvert::DataConvert::convertColumnDatetime(field, dataconvert::CALPONTDATETIME_ENUM,
                                                                     rc, fieldLength);
          }
        }

        if (rc == 0)
        {
          if (llDate < bufStats.minBufferVal)
            bufStats.minBufferVal = llDate;

          if (llDate > bufStats.maxBufferVal)
            bufStats.maxBufferVal = llDate;
        }
        else
        {
          llDate = 0;
          bufStats.satCount++;
        }

        pVal = &llDate;
      }

      break;
    }

    //----------------------------------------------------------------------
    // WIDE DECIMAL
    //----------------------------------------------------------------------
    case WriteEngine::WR_BINARY:
    {
      bool bSatVal = false;

      if (nullFlag)
      {
        if (!column.autoIncFlag)
        {
          if (column.fWithDefault)
          {
            bigllVal = column.fDefaultWideDecimal;
            // fall through to update saturation and min/max
          }
          else
          {
            bigllVal = datatypes::Decimal128Null;
            pVal = &bigllVal;
            break;
          }
        }
        else
        {
          // TODO MCOL-641 Add support for int128_t version of
          // fAutoIncNextValue
          bigllVal = fAutoIncNextValue++;
        }
      }
      else
      {
        if (fImportDataMode != IMPORT_DATA_TEXT)
        {
          memcpy(&bigllVal, field, sizeof(bigllVal));
        }
        else
        {
          if (isTrueWord(const_cast<const char*>(field), fieldLength))
          {
            strcpy(field, "1");
            fieldLength = 1;
          }

          bool dummy = false;
          // Value saturation to 9999... or -9999... is handled by
          // number_int_value(), and the bSatVal flag is set to true
          dataconvert::number_int_value(
              string(field), column.dataType,
              datatypes::SystemCatalog::TypeAttributesStd(column.width, column.scale, column.precision),
              dummy, false, bigllVal, &bSatVal);
        }
      }

      if (bSatVal)
        bufStats.satCount++;

      // Update min/max range
      if (bigllVal < bufStats.bigMinBufferVal)
        bufStats.bigMinBufferVal = bigllVal;

      if (bigllVal > bufStats.bigMaxBufferVal)
        bufStats.bigMaxBufferVal = bigllVal;

      pVal = &bigllVal;

      break;
    }

    //----------------------------------------------------------------------
    // UNSIGNED BIG INT
    //----------------------------------------------------------------------
    case WriteEngine::WR_ULONGLONG:
    {
      bool bSatVal = false;

      if (nullFlag)
      {
        if (!column.autoIncFlag)
        {
          if (column.fWithDefault)
          {
            ullVal = column.fDefaultUInt;
            // fall through to update saturation and min/max
          }
          else
          {
            ullVal = joblist::UBIGINTNULL;
            pVal = &ullVal;
            break;
          }
        }
        else
        {
          ullVal = fAutoIncNextValue++;
        }
      }
      else
      {
        if (fImportDataMode != IMPORT_DATA_TEXT)
        {
          memcpy(&ullVal, field, sizeof(ullVal));
        }
        else
        {
          // Check for negative. strtoull doesn't do this for us.
          // I considered using boost::trim_left here, but part of the
          // exercise is to minimize cpu cycles, so I do it the old
          // fashioned way.  isspace() uses more cycles than direct
          // compare to ' ', '\t', etc.  but the payoff is that it
          // works with Locale, so it ought to work well with utf-8
          // input.
          int idx1;

          for (idx1 = 0; idx1 < fieldLength; idx1++)
          {
            if (!isspace(field[idx1]))
              break;
          }

          if ((idx1 < fieldLength) && (field[idx1] == '-'))
          {
            ullVal = static_cast<uint64_t>(column.fMinIntSat);
            bSatVal = true;
          }
          else
          {
            errno = 0;

            ullVal = strtoull(field, 0, 10);

            if (errno == ERANGE)
              bSatVal = true;
          }
        }
      }

      // Saturate the value
      if (ullVal > column.fMaxIntSat)
      {
        ullVal = column.fMaxIntSat;
        bSatVal = true;
      }
      else if (ullVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
      {
        ullVal = 1;
      }

      if (bSatVal)
        bufStats.satCount++;

      // Update min/max range
      if (ullVal < static_cast<uint64_t>(bufStats.minBufferVal))
        bufStats.minBufferVal = static_cast<int64_t>(ullVal);

      if (ullVal > static_cast<uint64_t>(bufStats.maxBufferVal))
        bufStats.maxBufferVal = static_cast<int64_t>(ullVal);

      pVal = &ullVal;
      break;
    }

    //----------------------------------------------------------------------
    // UNSIGNED MEDIUM INTEGER AND UNSIGNED INTEGER
    //----------------------------------------------------------------------
    case WriteEngine::WR_UMEDINT:
    case WriteEngine::WR_UINT:
    {
      int64_t origVal;
      bool bSatVal = false;

      if (nullFlag)
      {
        if (!column.autoIncFlag)
        {
          if (column.fWithDefault)
          {
            origVal = static_cast<int64_t>(column.fDefaultUInt);
            // fall through to update saturation and min/max
          }
          else
          {
            uiVal = joblist::UINTNULL;
            pVal = &uiVal;
            break;
          }
        }
        else
        {
          origVal = fAutoIncNextValue++;
        }
      }
      else
      {
        if (fImportDataMode != IMPORT_DATA_TEXT)
        {
          unsigned int iVal2;
          memcpy(&iVal2, field, sizeof(iVal2));
          origVal = iVal2;
        }
        else
        {
          errno = 0;

          origVal = strtoll(field, 0, 10);

          if (errno == ERANGE)
            bSatVal = true;
        }
      }

      // Saturate the value (saturates any negative value to 0)
      if (origVal < column.fMinIntSat)
      {
        origVal = column.fMinIntSat;
        bSatVal = true;
      }
      else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
      {
        origVal = static_cast<int64_t>(column.fMaxIntSat);
        bSatVal = true;
      }
      else if (origVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
      {
        origVal = 1;
      }

      if (bSatVal)
        bufStats.satCount++;

      // Update min/max range
      uint64_t uVal = origVal;

      if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
        bufStats.minBufferVal = origVal;

      if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
        bufStats.maxBufferVal = origVal;

      uiVal = origVal;
      pVal = &uiVal;
      break;
    }

    //----------------------------------------------------------------------
    // MEDIUM INTEGER AND INTEGER
    //----------------------------------------------------------------------
    case WriteEngine::WR_MEDINT:
    case WriteEngine::WR_INT:
    default:
    {
      if (column.dataType != CalpontSystemCatalog::DATE)
      {
        long long origVal;
        bool bSatVal = false;

        if (nullFlag)
        {
          if (!column.autoIncFlag)
          {
            if (column.fWithDefault)
            {
              origVal = column.fDefaultInt;
              // fall through to update saturation and min/max
            }
            else
            {
              iVal = joblist::INTNULL;
              pVal = &iVal;
              break;
            }
          }
          else
          {
            origVal = fAutoIncNextValue++;
          }
        }
        else
        {
          if (fImportDataMode != IMPORT_DATA_TEXT)
          {
            int iVal2;
            memcpy(&iVal2, field, sizeof(iVal2));
            origVal = iVal2;
          }
          else
          {
            if (isTrueWord(const_cast<const char*>(field), fieldLength))
            {
              strcpy(field, "1");
              fieldLength = 1;
            }

            if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
                (column.dataType == CalpontSystemCatalog::UDECIMAL))
            {
              // errno is initialized and set in convertDecimalString
              origVal = Convertor::convertDecimalString(field, fieldLength, column.scale);
            }
            else
            {
              errno = 0;
              origVal = strtol(field, 0, 10);
            }

            if (errno == ERANGE)
              bSatVal = true;
          }
        }

        // Saturate the value
        if (origVal < column.fMinIntSat)
        {
          origVal = column.fMinIntSat;
          bSatVal = true;
        }
        else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
        {
          origVal = static_cast<int64_t>(column.fMaxIntSat);
          bSatVal = true;
        }

        if (bSatVal)
          bufStats.satCount++;

        // Update min/max range
        if (origVal < bufStats.minBufferVal)
          bufStats.minBufferVal = origVal;

        if (origVal > bufStats.maxBufferVal)
          bufStats.maxBufferVal = origVal;

        iVal = (int)origVal;
        pVal = &iVal;
      }
      else
      {
        // date conversion
        int rc = 0;

        if (nullFlag)
        {
          if (column.fWithDefault)
          {
            iDate = column.fDefaultInt;
            // fall through to update saturation and min/max
          }
          else
          {
            iDate = joblist::DATENULL;
            pVal = &iDate;
            break;
          }
        }
        else
        {
          if (fImportDataMode != IMPORT_DATA_TEXT)
          {
            memcpy(&iDate, field, sizeof(iDate));

            if (!dataconvert::DataConvert::isColumnDateValid(iDate))
              rc = -1;
          }
          else
          {
            iDate = dataconvert::DataConvert::convertColumnDate(field, dataconvert::CALPONTDATE_ENUM, rc,
                                                                fieldLength);
          }
        }

        if (rc == 0)
        {
          if (iDate < bufStats.minBufferVal)
            bufStats.minBufferVal = iDate;

          if (iDate > bufStats.maxBufferVal)
            bufStats.maxBufferVal = iDate;
        }
        else
        {
          iDate = 0;
          bufStats.satCount++;
        }

        pVal = &iDate;
      }

      break;
    }
  }

  memcpy(output, pVal, width);
}

//------------------------------------------------------------------------------
// Parse the contents of the Read buffer based on whether it is a dictionary
// column or not.
//------------------------------------------------------------------------------
int BulkLoadBuffer::parse(ColumnInfo& columnInfo)
{
  int rc = NO_ERROR;

  // Rather than locking fSyncUpdatesBLB for the entire life of parse(),
  // we only briefly lock, and force a synchronization with the relevant
  // class variables from reader threads (by copying to Parser specific
  // variables).  It should be okay to reference a copy of these variables
  // as no other thread should be changing them while we are in parse().
  {
    boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
    fTotalReadRowsParser = fTotalReadRows;
    fStartRowParser = fStartRow;
    fDataParser = fData;
    fTokensParser = fTokens;
    fStartRowForLoggingParser = fStartRowForLogging;
    fAutoIncGenCountParser = fAutoIncGenCount;
  }

  // Bug806 - If buffer is empty then return early.
  if (fTotalReadRowsParser == 0)
    return rc;

  // If this is the first batch of rows, create the starting DB file
  // if this PM did not have a DB file (delayed file creation).
  RETURN_ON_ERROR(columnInfo.createDelayedFileIfNeeded(fTableName));

  if (columnInfo.column.colType == COL_TYPE_DICT)
    rc = parseDict(columnInfo);
  else
    rc = parseCol(columnInfo);

  return rc;
}

//------------------------------------------------------------------------------
// Parse nonDictionary column Read buffer.  Parsed row values are added to
// fColBufferMgr, which stores them into an output buffer before writing them
// out to the applicable column segment file.
//------------------------------------------------------------------------------
int BulkLoadBuffer::parseCol(ColumnInfo& columnInfo)
{
  int rc = NO_ERROR;

  // Parse the data and fill up a buffer; which is written to output file
  uint32_t nRowsParsed;

  if (fLog->isDebug(DEBUG_2))
  {
    ostringstream oss;
    oss << "ColResSecIn:  OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << fStartRowParser << " "
        << fTotalReadRowsParser;
    fLog->logMsg(oss.str(), MSGLVL_INFO2);
  }

  ColumnBufferSection* section = 0;
  RID lastInputRowInExtent;
  RETURN_ON_ERROR(columnInfo.fColBufferMgr->reserveSection(fStartRowParser, fTotalReadRowsParser, nRowsParsed,
                                                           &section, lastInputRowInExtent));

  if (nRowsParsed > 0)
  {
#ifdef PROFILE
    Stats::startParseEvent(WE_STATS_PARSE_COL);
#endif

    // Reserve auto-increment numbers we need to generate
    if ((columnInfo.column.autoIncFlag) && (fAutoIncGenCountParser > 0))
    {
      rc = columnInfo.reserveAutoIncNums(fAutoIncGenCountParser, fAutoIncNextValue);

      if (rc != NO_ERROR)
      {
        WErrorCodes ec;
        ostringstream oss;
        oss << "parseCol: error generating auto-increment values "
               "for table-"
            << fTableName << ", column-" << columnInfo.column.colName << "; OID-" << columnInfo.column.mapOid
            << "; " << ec.errorString(rc);
        fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
        BulkLoad::addErrorMsg2BrmUpdater(fTableName, oss);
        return rc;
      }
    }

    // create a buffer for the size of the rows being written.
    unsigned char* buf = new unsigned char[fTotalReadRowsParser * columnInfo.column.width];
    char* field = new char[MAX_FIELD_SIZE + 1];

    // Initialize min/max buffer values.  We initialize to a sufficient
    // range to force the first value to automatically update the range.
    // If we are managing char data, minBufferVal and maxBufferVal are
    // maintained in reverse byte order to facilitate string comparisons
    BLBufferStats bufStats(columnInfo.column.dataType);
    bool updateCPInfoPendingFlag = false;

    int tokenLength = 0;
    bool tokenNullFlag = false;

    for (uint32_t i = 0; i < fTotalReadRowsParser; ++i)
    {
      char* p = fDataParser + fTokensParser[i][columnInfo.id].start;

      if (fTokensParser[i][columnInfo.id].offset > 0)
      {
        memcpy(field, p, fTokensParser[i][columnInfo.id].offset);
        field[fTokensParser[i][columnInfo.id].offset] = '\0';
        tokenLength = fTokensParser[i][columnInfo.id].offset;
        tokenNullFlag = false;
      }
      else
      {
        field[0] = '\0';
        tokenLength = 0;
        tokenNullFlag = true;
      }

      // convert the data into appropriate format and update CP values
      convert(field, tokenLength, tokenNullFlag, buf + i * columnInfo.column.width, columnInfo.column,
              bufStats);
      updateCPInfoPendingFlag = true;

      // Update CP min/max if this is last row in this extent
      if ((fStartRowParser + i) == lastInputRowInExtent)
      {
        if (columnInfo.column.width <= 8)
        {
          columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal,
                                  columnInfo.column.dataType, columnInfo.column.width);
        }
        else
        {
          columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal,
                                  columnInfo.column.dataType, columnInfo.column.width);
        }

        // TODO MCOL-641 Add support here.
        if (fLog->isDebug(DEBUG_2))
        {
          ostringstream oss;
          oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid
              << "; StartRID/Rows1: " << section->startRowId() << " " << i + 1
              << "; lastExtentRow: " << lastInputRowInExtent;
          parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal);

          fLog->logMsg(oss.str(), MSGLVL_INFO2);
        }

        lastInputRowInExtent += columnInfo.rowsPerExtent();

        if (isUnsigned(columnInfo.column.dataType))
        {
          if (columnInfo.column.width <= 8)
          {
            bufStats.minBufferVal = static_cast<int64_t>(MAX_UBIGINT);
            bufStats.maxBufferVal = static_cast<int64_t>(MIN_UBIGINT);
          }
          else
          {
            bufStats.bigMinBufferVal = -1;
            bufStats.bigMaxBufferVal = 0;
          }
          updateCPInfoPendingFlag = false;
        }
        else
        {
          if (columnInfo.column.width <= 8)
          {
            bufStats.minBufferVal = MAX_BIGINT;
            bufStats.maxBufferVal = MIN_BIGINT;
          }
          else
          {
            utils::int128Max(bufStats.bigMinBufferVal);
            utils::int128Min(bufStats.bigMaxBufferVal);
          }
          updateCPInfoPendingFlag = false;
        }
      }
    }

    if (updateCPInfoPendingFlag)
    {
      if (columnInfo.column.width <= 8)
      {
        columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal,
                                columnInfo.column.dataType, columnInfo.column.width);
      }
      else
      {
        columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal,
                                columnInfo.column.dataType, columnInfo.column.width);
      }
    }

    if (bufStats.satCount)  // @bug 3504: increment row saturation count
    {
      // If we don't want to allow saturated values for auto inc columns.
      // then this is where we handle it.  Too late to reject a single
      // row from the parsing thread, so we abort the job.
      // if (columnInfo.column.autoIncFlag)
      //{
      //    rc = ERR_AUTOINC_USER_OUT_OF_RANGE;
      //    WErrorCodes ec;
      //    ostringstream oss;
      //    oss << "parseCol: error with auto-increment values "
      //        "for table-" << fTableName <<
      //        ", column-" << columnInfo.column.colName <<
      //        "; OID-" << columnInfo.column.mapOid <<
      //        "; " << ec.errorString(rc);
      //    fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
      //    return rc;
      //}
      columnInfo.incSaturatedCnt(bufStats.satCount);
    }

    delete[] field;
    section->write(buf, fTotalReadRowsParser);
    delete[] buf;
#ifdef PROFILE
    Stats::stopParseEvent(WE_STATS_PARSE_COL);
#endif

    // TODO MCOL-641 Add support here.
    if (fLog->isDebug(DEBUG_2))
    {
      ostringstream oss;
      RID rid1 = section->startRowId();
      RID rid2 = section->endRowId();
      oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows2: " << rid1 << " "
          << (rid2 - rid1) + 1 << "; startOffset: " << section->getStartOffset()
          << "; lastExtentRow: " << lastInputRowInExtent;
      parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal);

      fLog->logMsg(oss.str(), MSGLVL_INFO2);
    }

    RETURN_ON_ERROR(columnInfo.fColBufferMgr->releaseSection(section));
  }

  return rc;
}

//------------------------------------------------------------------------------
// Log the specified min/max buffer values to the log file.  This is straight
// forward for numeric types, but for character data, we have to reverse the
// order of min/max values, because they are maintained in reverse order to
// facilitate the comparison of character strings in an int64_t variable.
//------------------------------------------------------------------------------
void BulkLoadBuffer::parseColLogMinMax(ostringstream& oss, ColDataType colDataType, int64_t minBufferVal,
                                       int64_t maxBufferVal) const
{
  if (isCharType(colDataType))
  {
    // Swap/restore byte order before printing character string
    int64_t minVal = static_cast<int64_t>(uint64ToStr(static_cast<uint64_t>(minBufferVal)));
    int64_t maxVal = static_cast<int64_t>(uint64ToStr(static_cast<uint64_t>(maxBufferVal)));
    char minValStr[sizeof(int64_t) + 1];
    char maxValStr[sizeof(int64_t) + 1];
    memcpy(minValStr, &minVal, sizeof(int64_t));
    memcpy(maxValStr, &maxVal, sizeof(int64_t));
    minValStr[sizeof(int64_t)] = '\0';
    maxValStr[sizeof(int64_t)] = '\0';
    oss << "; minVal: " << minVal << "; (" << minValStr << ")"
        << "; maxVal: " << maxVal << "; (" << maxValStr << ")";
  }
  else if (isUnsigned(colDataType))
  {
    oss << "; minVal: " << static_cast<uint64_t>(minBufferVal)
        << "; maxVal: " << static_cast<uint64_t>(maxBufferVal);
  }
  else
  {
    oss << "; minVal: " << minBufferVal << "; maxVal: " << maxBufferVal;
  }
}

//------------------------------------------------------------------------------
// Parse Dictionary column Read buffer.  Parsed row values are added to
// fColBufferMgr, which stores them into an output buffer before writing them
// out to the applicable column segment (token) file.  This gets a little sticky
// here if the amount of data (in the column file) crosses an extent boundary.
// In this case, we have to split up the tokens into 2 column segment files
// and of course split up the corresponding strings into 2 different dictionary
// store files as well.
//------------------------------------------------------------------------------
int BulkLoadBuffer::parseDict(ColumnInfo& columnInfo)
{
  int rc = NO_ERROR;

  uint32_t nRowsParsed1;
  rc = parseDictSection(columnInfo, 0, fStartRowParser, fTotalReadRowsParser, nRowsParsed1);

  if (rc != NO_ERROR)
  {
    WErrorCodes ec;
    ostringstream oss;
    oss << "parseDict: error parsing section1: "
        << " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc);
    fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
    return rc;
  }

  //..If fTotalReadRows != nRowsParsed1 then reserveInSection() had to
  //  split up our input buffer tokens because they spanned 2 extents.
  //  After exiting reserveSection() above, we no longer have a mutex
  //  lock on the sections in the internal buffer, so you might think
  //  this could cause a race condition with more rows being added to
  //  the buffer by other parsing threads, while we are busy wrapping
  //  up the first extent and creating the second.  But since reserve-
  //  Section() only took some of the rows from the Read buffer, any
  //  other threads should be blocked waiting for us to add the remain-
  //  ing rows from "this" Read buffer into a new ColumnBufferSection.
  //  The following condition wait in reserveSection() should be keeping
  //  things stable:
  //      while((fMaxRowId + 1) != startRowId) {
  //        //Making sure that allocation are made in order
  //        fOutOfSequence.wait(lock);
  //      }

  if (fTotalReadRowsParser != nRowsParsed1)
  {
    if (fLog->isDebug(DEBUG_1))
    {
      ostringstream oss;
      oss << "parseDict breaking up bufsec for OID-" << columnInfo.curCol.dataFile.fid << "; file-"
          << columnInfo.curCol.dataFile.fSegFileName << "; totalInRows-" << fTotalReadRowsParser
          << "; rowsFlushedToEndExtent-" << nRowsParsed1;
      fLog->logMsg(oss.str(), MSGLVL_INFO2);
    }

    //..Flush the rows in the buffer that fill up the current extent
    rc = columnInfo.fColBufferMgr->intermediateFlush();

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      ostringstream oss;
      oss << "parseDict: error flushing column: "
          << " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc);
      fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
      return rc;
    }

    //..See if we just finished filling in the last extent for this seg-
    //  ment token file, in which case we can truncate the corresponding
    //  dictionary store segment file. (this only affects compressed data).
    uint16_t root = columnInfo.curCol.dataFile.fDbRoot;
    uint32_t pNum = columnInfo.curCol.dataFile.fPartition;
    uint16_t sNum = columnInfo.curCol.dataFile.fSegment;
    bool bFileComplete = columnInfo.isFileComplete();

    //..Close the current segment file, and add an extent to the next
    //  segment file in the rotation sequence.  newSegmentFile is a
    //  FILE* that points to the newly opened segment file.
    rc = columnInfo.fColBufferMgr->extendTokenColumn();

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      ostringstream oss;
      oss << "parseDict: error extending column: "
          << " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc);
      fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
      return rc;
    }

    //..Close current dictionary store file and open the dictionary
    //  store file that will match the newly opened column segment file.
    rc = columnInfo.closeDctnryStore(false);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      ostringstream oss;
      oss << "parseDict: error closing store file: "
          << " OID-" << columnInfo.column.dctnry.dctnryOid << "; " << ec.errorString(rc);
      fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
      return rc;
    }

    rc = columnInfo.openDctnryStore(false);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      ostringstream oss;
      oss << "parseDict: error opening store file: "
          << " OID-" << columnInfo.column.dctnry.dctnryOid << "; " << ec.errorString(rc);
      fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);

      // Ignore return code from closing file; already in error state
      columnInfo.closeDctnryStore(true);  // clean up loose ends
      return rc;
    }

    //..Now we can add the remaining rows in the current Read buffer to
    //  to the output buffer destined for the next extent we just added.
    uint32_t nRowsParsed2;
    rc = parseDictSection(columnInfo, nRowsParsed1, (fStartRowParser + nRowsParsed1),
                          (fTotalReadRowsParser - nRowsParsed1), nRowsParsed2);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      ostringstream oss;
      oss << "parseDict: error parsing section2: "
          << " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc);
      fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
      return rc;
    }

    //..We went ahead and completed all the necessary parsing to free up
    //  the buffer we were working on, so that any blocked threads can
    //  continue.  In the mean time, this thread can now go back and
    //  truncate the dctnry store file we just completed, if applicable.
    if (bFileComplete)
    {
      rc = columnInfo.truncateDctnryStore(columnInfo.column.dctnry.dctnryOid, root, pNum, sNum);

      if (rc != NO_ERROR)
        return rc;
    }
  }

  return rc;
}

//------------------------------------------------------------------------------
// Parses all or part of a Dictionary Read buffer into a ColumnBufferSection,
// depending on whether the buffer crosses an extent boundary or not.  If it
// crosses such a boundary, then parseDictSection() will only parse the buffer
// up to the end of the current extent.  A second call to parseDictSection()
// should be made to parse the remainder of the buffer into the second extent.
//------------------------------------------------------------------------------
int BulkLoadBuffer::parseDictSection(ColumnInfo& columnInfo, int tokenPos, RID startRow,
                                     uint32_t totalReadRows, uint32_t& nRowsParsed)
{
  int rc = NO_ERROR;

  if (fLog->isDebug(DEBUG_2))
  {
    ostringstream oss;
    oss << "DctResSecIn:  OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << startRow << " "
        << totalReadRows;
    fLog->logMsg(oss.str(), MSGLVL_INFO2);
  }

  ColumnBufferSection* section = 0;
  RID lastInputRowInExtent = 0;
  RETURN_ON_ERROR(columnInfo.fColBufferMgr->reserveSection(startRow, totalReadRows, nRowsParsed, &section,
                                                           lastInputRowInExtent));

  if (nRowsParsed > 0)
  {
    char* tokenBuf = new char[nRowsParsed * 8];

    // Pass fDataParser data and fTokensParser meta data to dictionary
    // to be parsed and tokenized, with tokens returned in tokenBuf.
    rc = columnInfo.updateDctnryStore(fDataParser, &fTokensParser[tokenPos], nRowsParsed, tokenBuf);

    if (rc == NO_ERROR)
    {
#if 0
            int64_t* tokenVals = reinterpret_cast<int64_t*>(tokenBuf);

            for (unsigned int j = 0; j < nRowsParsed; j++)
            {
                if (tokenVals[j] == 0)
                {
                    ostringstream oss;
                    oss << "Warning: 0 token being stored for OID-" <<
                        columnInfo.curCol.dataFile.fid << "; file-" <<
                        columnInfo.curCol.dataFile.fSegFileName <<
                        "; input row number-" << fStartRowForLoggingParser + j;
                    fLog->logMsg( oss.str(), MSGLVL_INFO1 );
                }
            }

#endif
      section->write(tokenBuf, nRowsParsed);
      delete[] tokenBuf;

      if (fLog->isDebug(DEBUG_2))
      {
        ostringstream oss;
        RID rid1 = section->startRowId();
        RID rid2 = section->endRowId();
        oss << "DctRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << rid1 << " "
            << (rid2 - rid1) + 1 << "; startOffset: " << section->getStartOffset();
        fLog->logMsg(oss.str(), MSGLVL_INFO2);
      }

      RETURN_ON_ERROR(columnInfo.fColBufferMgr->releaseSection(section));
    }
    else
    {
      delete[] tokenBuf;
    }
  }

  return rc;
}

int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length,
                                   size_t* parse_length, RID& totalReadRows, RID& correctTotalRows,
                                   const boost::ptr_vector<ColumnInfo>& columnsInfo,
                                   unsigned int allowedErrCntThisCall)
{
  boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
  reset();
  copyOverflow(overFlowBufIn);
  size_t readSize = 0;

  // Copy the overflow data from the last buffer, that did not get written
  if (fOverflowSize != 0)
  {
    memcpy(fData, fOverflowBuf, fOverflowSize);

    if (fOverflowBuf != NULL)
    {
      delete[] fOverflowBuf;
      fOverflowBuf = NULL;
    }
  }

  readSize = fBufferSize - fOverflowSize;
  if (readSize > (length - *parse_length))
  {
    readSize = length - *parse_length;
  }
  memcpy(fData + fOverflowSize, input + *parse_length, readSize);
  *parse_length += readSize;

  bool bEndOfData = false;

  if (length == *parse_length)
  {
    bEndOfData = true;
  }

  if (bEndOfData &&                           // @bug 3516: Add '\n' if missing from last record
      (fImportDataMode == IMPORT_DATA_TEXT))  // Only applies to ascii mode
  {
    if ((fOverflowSize > 0) | (readSize > 0))
    {
      if (fData[fOverflowSize + readSize - 1] != '\n')
      {
        // Should be safe to add byte to fData w/o risk of overflowing,
        // since we hit EOF.  That should mean fread() did not read all
        // the bytes we requested, meaning we have room to add a byte.
        fData[fOverflowSize + readSize] = '\n';
        readSize++;
      }
    }
  }

  // Lazy allocation of fToken memory as needed
  if (fTokens == 0)
  {
    resizeTokenArray();
  }

  if ((readSize > 0) || (fOverflowSize > 0))
  {
    if (fOverflowBuf != NULL)
    {
      delete[] fOverflowBuf;
      fOverflowBuf = NULL;
    }

    fReadSize = readSize + fOverflowSize;
    fStartRow = correctTotalRows;
    fStartRowForLogging = totalReadRows;

    if (fImportDataMode == IMPORT_DATA_TEXT)
    {
      tokenize(columnsInfo, allowedErrCntThisCall);
    }
    else
    {
      int rc = tokenizeBinary(columnsInfo, allowedErrCntThisCall, bEndOfData);

      if (rc != NO_ERROR)
        return rc;
    }

    // If we read a full buffer without hitting any new lines, then
    // terminate import because row size is greater than read buffer size.
    if ((fTotalReadRowsForLog == 0) && (fReadSize == fBufferSize))
    {
      return ERR_BULK_ROW_FILL_BUFFER;
    }

    totalReadRows += fTotalReadRowsForLog;
    correctTotalRows += fTotalReadRows;
  }

  return NO_ERROR;
}

//------------------------------------------------------------------------------
// Read the next set of rows from the input import file (for the specified
// table), into "this" BulkLoadBuffer.
// totalReadRows (input/output) - total row count from tokenize() (per file)
// correctTotalRows (input/output) - total valid row count from tokenize()
//   (cumulative)
//------------------------------------------------------------------------------
int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, RID& totalReadRows,
                                 RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
                                 unsigned int allowedErrCntThisCall)
{
  boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
  reset();
  copyOverflow(overFlowBufIn);
  size_t readSize = 0;

  // Copy the overflow data from the last buffer, that did not get written
  if (fOverflowSize != 0)
  {
    memcpy(fData, fOverflowBuf, fOverflowSize);

    if (fOverflowBuf != NULL)
    {
      delete[] fOverflowBuf;
      fOverflowBuf = NULL;
    }
  }

  readSize = fread(fData + fOverflowSize, 1, fBufferSize - fOverflowSize, handle);

  if (ferror(handle))
  {
    return ERR_FILE_READ_IMPORT;
  }

  bool bEndOfData = false;

  if (feof(handle))
    bEndOfData = true;

  if (bEndOfData &&                           // @bug 3516: Add '\n' if missing from last record
      (fImportDataMode == IMPORT_DATA_TEXT))  // Only applies to ascii mode
  {
    if ((fOverflowSize > 0) | (readSize > 0))
    {
      if (fData[fOverflowSize + readSize - 1] != '\n')
      {
        // Should be safe to add byte to fData w/o risk of overflowing,
        // since we hit EOF.  That should mean fread() did not read all
        // the bytes we requested, meaning we have room to add a byte.
        fData[fOverflowSize + readSize] = '\n';
        readSize++;
      }
    }
  }

  // Lazy allocation of fToken memory as needed
  if (fTokens == 0)
  {
    resizeTokenArray();
  }

  if ((readSize > 0) || (fOverflowSize > 0))
  {
    if (fOverflowBuf != NULL)
    {
      delete[] fOverflowBuf;
      fOverflowBuf = NULL;
    }

    fReadSize = readSize + fOverflowSize;
    fStartRow = correctTotalRows;
    fStartRowForLogging = totalReadRows;

    if (fImportDataMode == IMPORT_DATA_TEXT)
    {
      tokenize(columnsInfo, allowedErrCntThisCall);
    }
    else
    {
      int rc = tokenizeBinary(columnsInfo, allowedErrCntThisCall, bEndOfData);

      if (rc != NO_ERROR)
        return rc;
    }

    // If we read a full buffer without hitting any new lines, then
    // terminate import because row size is greater than read buffer size.
    if ((fTotalReadRowsForLog == 0) && (fReadSize == fBufferSize))
    {
      return ERR_BULK_ROW_FILL_BUFFER;
    }

    totalReadRows += fTotalReadRowsForLog;
    correctTotalRows += fTotalReadRows;
  }

  return NO_ERROR;
}

//------------------------------------------------------------------------------
// Parse the rows of data in "fData", saving the meta information that describes
// the parsed data, in fTokens.  If the number of read parsing errors for a
// given call to tokenize() should exceed the value of "allowedErrCntThisCall",
// then tokenize() will stop reading data and exit.
//
// We parse the data using the following state machine-like table.
// Enclosed by character ("), escaped by character (\), and field delimiter
// (|) can all be overridden; but we show default values in the state table.
//
//                    Character(s) found and action taken
//
//   Current          \" or    "          |        \n       other
//   State            ""                                    character
//   -----------------------------------------------------------------------
//   LEADING_CHAR   | n/a      ENCLOSED   endFld   endFld   NORMAL
//   TRAILING_CHAR  | n/a      n/a        endFld   endFld   ignore
//   ENCLOSED       | convert  TRAIL      n/a      n/a      n/a
//   NORMAL         | n/a      n/a        endFld   endFld   n/a
//   -----------------------------------------------------------------------
//
//   n/a      - not applicable; no check is made for this specific character in
//              this state
//   ENCLOSED - transition to ENCLOSED state
//   TRAIL    - transition to TRAILING_CHAR state
//   NORMAL   - transition to NORMAL state
//   convert  - convert an escaped double quote (\" or "") to a single double
//              quote ("), and strip out the other character
//
// The initial parsing state for each column is LEADING_CHAR or NORMAL,
// depending on whether the user has enabled the "enclosed by" feature.
//------------------------------------------------------------------------------
void BulkLoadBuffer::tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo,
                              unsigned int allowedErrCntThisCall)
{
  unsigned offset = 0;           // length of field
  unsigned curCol = 0;           // dest db column counter within a row
  unsigned curFld = 0;           // src input field counter within a row
  unsigned curRowNum = 0;        // "total" number of rows read during this call
  unsigned curRowNum1 = 0;       // number of "valid" rows inserted into fTokens
  char* p;                       // iterates thru each byte in the input buffer
  char c;                        // value of byte at address "p".
  char* lastRowHead = 0;         // start of latest row being processed
  bool bValidRow = true;         // track whether current row is valid
  bool bRowGenAutoInc = false;   // track whether row uses generated auto-inc
  std::string validationErrMsg;  // validation error msg (if any) for current row
  unsigned errorCount = 0;
  const char FIELD_DELIM_CHAR = fColDelim;
  const char STRING_ENCLOSED_CHAR = fEnclosedByChar;
  const char ESCAPE_CHAR = fEscapeChar;
  const char LINE_FEED = 0x0D;
  const char CARRIAGE_RETURN = 0x0A;

  // Variables used to store raw data read for a row; needed if we strip out
  // enclosed char(s) and later have to print original data in a *.bad file
  char* pRawDataRow = 0;
  unsigned rawDataRowCapacity = 0;
  unsigned rawDataRowLength = 0;
  const unsigned MIN_RAW_DATA_CAP = 1024;

  // Enable "enclosed by" checking if user specified an "enclosed by" char
  FieldParsingState initialState = FLD_PARSE_NORMAL_STATE;

  if (STRING_ENCLOSED_CHAR != '\0')
  {
    initialState = FLD_PARSE_LEADING_CHAR_STATE;
  }

  FieldParsingState fieldState = initialState;
  bool bNewLine = false;  // Tracks new line
  unsigned start = 0;     // Where next field starts in fData
  unsigned idxFrom = 0;   // idxFrom and idxTo are used to strip out
  unsigned idxTo = 0;     //   escape characters in \" and ""

  // Initialize which field values are enclosed
  unsigned int enclosedFieldFlag = 0;
#ifdef DEBUG_TOKEN_PARSING
  unsigned int enclosedFieldFlags[fNumberOfColumns];
  memset(enclosedFieldFlags, 0, sizeof(unsigned) * fNumberOfColumns);
#endif

  p = lastRowHead = fData;
  const char* pEndOfData = p + fReadSize;  //@bug3810 set an end-of-data marker

  //--------------------------------------------------------------------------
  // Loop through all the bytes in the read buffer in order to construct
  // the meta data stored in fTokens.
  //--------------------------------------------------------------------------
  while (p < pEndOfData)
  {
    c = *p;

    // If we have stripped "enclosed" characters, then save raw data
    if (rawDataRowLength > 0)
    {
      if (rawDataRowLength == rawDataRowCapacity)  // resize array if full
      {
        rawDataRowCapacity = rawDataRowCapacity * 2;
        resizeRowDataArray(&pRawDataRow, rawDataRowLength, rawDataRowCapacity);
      }

      pRawDataRow[rawDataRowLength] = c;
      rawDataRowLength++;
    }

    //----------------------------------------------------------------------
    // Branch based on current parsing state for this field.
    // Note that we fall out of switch/case and do more processing if we
    // have hit end of column or line; else we "continue" directly to end
    // of loop to process the next byte.
    //----------------------------------------------------------------------
    switch (fieldState)
    {
      //------------------------------------------------------------------
      // FLD_PARSE_NORMAL_STATE
      // Field not enclosed in a string delimiter such as a double quote
      //------------------------------------------------------------------
      case FLD_PARSE_NORMAL_STATE:
      {
        if ((c == FIELD_DELIM_CHAR) || (c == NEWLINE_CHAR))
        {
          start = p - fData - offset;

          if (c == NEWLINE_CHAR)
            bNewLine = true;
        }
        else
        {
          offset++;
          p++;
          continue;  // process next byte
        }

        break;
      }

      // If state is something other than FLD_PARSE_NORMAL_STATE, then
      // there is extra processing to allow for fields that may be en-
      // closed within a string delimiter (such as a double quote)

      //----------------------------------------------------------------
      // FLD_PARSE_LEADING_CHAR_STATE
      //----------------------------------------------------------------
      case FLD_PARSE_LEADING_CHAR_STATE:
      {
        bool bNewColumn = false;

        if (c == STRING_ENCLOSED_CHAR)
        {
          fieldState = FLD_PARSE_ENCLOSED_STATE;
          idxFrom = p - fData + 1;
          idxTo = idxFrom;
          start = idxTo;
          offset = 0;
          enclosedFieldFlag = 1;
        }

        else if ((c == FIELD_DELIM_CHAR) || (c == NEWLINE_CHAR))
        {
          bNewColumn = true;
          start = p - fData;
          offset = 0;

          if (c == NEWLINE_CHAR)
            bNewLine = true;
        }

        else
        {
          fieldState = FLD_PARSE_NORMAL_STATE;
          start = p - fData;
          offset = 1;
        }

        if (!bNewColumn)
        {
          p++;
          continue;  // process next byte
        }

        break;
      }

      //------------------------------------------------------------------
      // FLD_PARSE_ENCLOSED_STATE
      //------------------------------------------------------------------
      case FLD_PARSE_ENCLOSED_STATE:
      {
        if ((p + 1 < pEndOfData) &&
            (((c == ESCAPE_CHAR) && ((*(p + 1) == STRING_ENCLOSED_CHAR) || (*(p + 1) == ESCAPE_CHAR) ||
                                     (*(p + 1) == LINE_FEED) || (*(p + 1) == CARRIAGE_RETURN))) ||
             ((c == STRING_ENCLOSED_CHAR) && (*(p + 1) == STRING_ENCLOSED_CHAR))))
        {
          // Create/save original data before stripping out bytes
          if (rawDataRowLength == 0)
          {
            rawDataRowLength = (p + 1) - lastRowHead + 1;
            rawDataRowCapacity = rawDataRowLength * 2;

            if (rawDataRowCapacity < MIN_RAW_DATA_CAP)
              rawDataRowCapacity = MIN_RAW_DATA_CAP;

            pRawDataRow = new char[rawDataRowCapacity];
            memcpy(pRawDataRow, lastRowHead, rawDataRowLength);
          }
          else
          {
            if (rawDataRowLength == rawDataRowCapacity)
            {
              // resize array if full
              rawDataRowCapacity = rawDataRowCapacity * 2;
              resizeRowDataArray(&pRawDataRow, rawDataRowLength, rawDataRowCapacity);
            }

            pRawDataRow[rawDataRowLength] = *(p + 1);
            rawDataRowLength++;
          }

          fData[idxTo] = *(p + 1);
          idxFrom += 2;
          idxTo++;
          offset++;
          p++;
        }

        else if (c == STRING_ENCLOSED_CHAR)
        {
          fieldState = FLD_PARSE_TRAILING_CHAR_STATE;
        }

        else
        {
          if (idxTo != idxFrom)
            fData[idxTo] = fData[idxFrom];

          idxFrom++;
          idxTo++;
          offset++;
        }

        p++;
        continue;  // process next byte
      }

      //------------------------------------------------------------------
      // FLD_PARSE_TRAILING_CHAR_STATE
      // Ignore any trailing chars till we reach field or line delimiter.
      //------------------------------------------------------------------
      case FLD_PARSE_TRAILING_CHAR_STATE:
      default:
      {
        if ((c == FIELD_DELIM_CHAR) || (c == NEWLINE_CHAR))
        {
          if (c == NEWLINE_CHAR)
            bNewLine = true;
        }
        else
        {
          p++;
          continue;  // process next byte
        }

        break;
      }
    }  // end of switch on fieldState

    //----------------------------------------------------------------------
    // Finished reading the bytes in the next source field.
    // See if source field is to be included (or ignored)
    //----------------------------------------------------------------------
    if ((curFld < fNumFieldsInFile) && (fFieldList[curFld].fFldColType == BULK_FLDCOL_COLUMN_FIELD))
    {
      //------------------------------------------------------------------
      // Process destination column or end of row if source field is to
      // be included as part of output to database.
      //------------------------------------------------------------------
      if (curCol < fNumColsInFile)
      {
        const JobColumn& jobCol = columnsInfo[curCol].column;

        // tmp code to test trailing space
        if (jobCol.dataType == CalpontSystemCatalog::CHAR)
        {
          // cout << "triming ... " << endl;
          char* tmp = p;

          while (tmp != lastRowHead && *(--tmp) == ' ')
          {
            // cout << "offset is " << offset <<endl;
            offset--;
          }
        }

        fTokens[curRowNum1][curCol].start = start;
        fTokens[curRowNum1][curCol].offset = offset;
#ifdef DEBUG_TOKEN_PARSING
        enclosedFieldFlags[curCol] = enclosedFieldFlag;
#endif

        // Would like to refactor this validation logic into a separate
        // inline function, but code may be too long for compiler
        // to inline.  And factoring out into a non-inline function
        // slows down the read thread by 10%.  So left code here.
        if (offset)
        {
          switch (fTokens[curRowNum1][curCol].offset)
          {
            // Special auto-increment case; treat '0' as null value
            case 1:
            {
              if ((jobCol.autoIncFlag) && (*(fData + fTokens[curRowNum1][curCol].start) == NULL_AUTO_INC_0))
              {
                fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
                bRowGenAutoInc = true;
              }
              else if (jobCol.dataType == CalpontSystemCatalog::VARBINARY && (bValidRow))
              {
                bValidRow = false;

                ostringstream ossErrMsg;
                ossErrMsg << INPUT_ERROR_ODD_VARBINARY_LENGTH << "field " << (curFld + 1) << " has " << offset
                          << " bytes";
                validationErrMsg = ossErrMsg.str();
              }

              break;
            }

            case 2:
            {
              if ((*(fData + fTokens[curRowNum1][curCol].start) == ESCAPE_CHAR) &&
                  (*(fData + fTokens[curRowNum1][curCol].start + 1) == NULL_CHAR))
              {
                fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;

                if (jobCol.autoIncFlag)
                  bRowGenAutoInc = true;
              }

              break;
            }

            // If enclosedFlag set, then treat "NULL" as data and
            // not as a null value
            case 4:
            {
              if ((fNullStringMode) && (!enclosedFieldFlag))
              {
                if ((*(fData + fTokens[curRowNum1][curCol].start) == NULL_VALUE_STRING[0]) &&
                    (*(fData + fTokens[curRowNum1][curCol].start + 1) == NULL_VALUE_STRING[1]) &&
                    (*(fData + fTokens[curRowNum1][curCol].start + 2) == NULL_VALUE_STRING[2]) &&
                    (*(fData + fTokens[curRowNum1][curCol].start + 3) == NULL_VALUE_STRING[3]))
                {
                  fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;

                  if (jobCol.autoIncFlag)
                    bRowGenAutoInc = true;
                }
              }

              break;
            }

            default:
            {
              if ((jobCol.dataType == CalpontSystemCatalog::VARBINARY) && ((offset & 1) == 1) && (bValidRow))
              {
                bValidRow = false;

                ostringstream ossErrMsg;
                ossErrMsg << INPUT_ERROR_ODD_VARBINARY_LENGTH << "field " << (curFld + 1) << " has " << offset
                          << " bytes";
                validationErrMsg = ossErrMsg.str();
              }

              // @bug 3478: Truncate instead of rejecting dctnry
              // strings>8000. Only reject numeric cols>1000 bytes
              else if ((fTokens[curRowNum1][curCol].offset > MAX_FIELD_SIZE) &&
                       (jobCol.colType != COL_TYPE_DICT) && (bValidRow))
              {
                bValidRow = false;

                ostringstream ossErrMsg;
                ossErrMsg << INPUT_ERROR_TOO_LONG << "field " << (curFld + 1) << " longer than "
                          << MAX_FIELD_SIZE << " bytes";
                validationErrMsg = ossErrMsg.str();
              }

              break;
            }
          }  // end of switch on offset

          // @bug 4037: When cmd line option set, treat char
          // and varchar fields that are too long as errors
          if (getTruncationAsError() && bValidRow &&
              (fTokens[curRowNum1][curCol].offset != COLPOSPAIR_NULL_TOKEN_OFFSET))
          {
            if ((jobCol.dataType == CalpontSystemCatalog::VARCHAR ||
                 jobCol.dataType == CalpontSystemCatalog::CHAR) &&
                (fTokens[curRowNum1][curCol].offset > jobCol.definedWidth))
            {
              bValidRow = false;

              ostringstream ossErrMsg;
              ossErrMsg << INPUT_ERROR_STRING_TOO_LONG << "field " << (curFld + 1) << " longer than "
                        << jobCol.definedWidth << " bytes";
              validationErrMsg = ossErrMsg.str();
            }
          }

        }  // end of "if (offset)"
        else
        {
          fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;

          if (jobCol.autoIncFlag)
            bRowGenAutoInc = true;
        }

        // For non-autoincrement column,
        // Validate a NotNull column is supplied a value or a default
        if (!bRowGenAutoInc)
        {
          if ((jobCol.fNotNull) && (fTokens[curRowNum1][curCol].offset == COLPOSPAIR_NULL_TOKEN_OFFSET) &&
              (!jobCol.fWithDefault) && (bValidRow))
          {
            bValidRow = false;

            ostringstream ossErrMsg;
            ossErrMsg << INPUT_ERROR_NULL_CONSTRAINT << "; field " << (curFld + 1);
            validationErrMsg = ossErrMsg.str();
          }
        }
      }  // end if curCol < fNumberOfColumns

      curCol++;
    }

    curFld++;

    //----------------------------------------------------------------------
    // End-of-row processing
    //----------------------------------------------------------------------
    if (bNewLine)
    {
      // Debug: Dump next row that may or may not be accepted as
      // valid.  Not a typo, that we print "curRowNum" as the row
      // number, but we use curRowNum1 as the index into fTokens.
#ifdef DEBUG_TOKEN_PARSING
      std::cout << "Row " << curRowNum + 1 << ". fTokens: "
                << "(start,offset,enclosed)" << std::endl;
      unsigned kColCount = fNumColsInFile;

      if (curCol < kColCount)
        kColCount = curCol;

      for (unsigned int k = 0; k < kColCount; k++)
      {
        std::cout << "  (" << fTokens[curRowNum1][k].start << "," << fTokens[curRowNum1][k].offset << ","
                  << enclosedFieldFlags[k] << ") ";

        if (fTokens[curRowNum1][k].offset != COLPOSPAIR_NULL_TOKEN_OFFSET)
        {
          std::string outField(fData + fTokens[curRowNum1][k].start, fTokens[curRowNum1][k].offset);
          std::cout << "  " << outField << std::endl;
        }
        else
        {
          std::cout << "  <NULL>" << std::endl;
        }
      }

#endif

      curRowNum++;  // increment total number of rows read
      int rowLength = p - lastRowHead + 1;

      // @bug 3146: Allow optional trailing value at end of input file
      // Don't count last column if no data after last delimiter,
      // and we don't need that last column.
      if ((offset == 0) && (curFld == (fNumFieldsInFile + 1)))
      {
        curFld--;
      }

      if ((curFld != fNumFieldsInFile) && (bValidRow))
      {
        bValidRow = false;

        ostringstream ossErrMsg;
        ossErrMsg << INPUT_ERROR_WRONG_NO_COLUMNS << "; num fields expected-" << fNumFieldsInFile
                  << "; num fields found-" << curFld;
        validationErrMsg = ossErrMsg.str();
      }

      if (bValidRow)
      {
        // Initialize fTokens for <DefaultColumn> tags not in input file
        if (fNumColsInFile < fNumberOfColumns)
        {
          for (unsigned int n = fNumColsInFile; n < fNumberOfColumns; n++)
          {
            fTokens[curRowNum1][n].start = 0;
            fTokens[curRowNum1][n].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;

            if (columnsInfo[n].column.autoIncFlag)
              bRowGenAutoInc = true;
          }
        }

        curRowNum1++;  // increment valid row count

        if (bRowGenAutoInc)
          fAutoIncGenCount++;  // update number of generated auto-incs
      }
      else
      {
        // Store validation error message to be logged
        if (rawDataRowLength == 0)
        {
          string tmp(lastRowHead, rowLength);
          fErrRows.push_back(tmp);
        }
        else
        {
          string tmp(pRawDataRow, rawDataRowLength);
          fErrRows.push_back(tmp);
        }

        fRowStatus.push_back(std::pair<RID, std::string>(fStartRowForLogging + curRowNum, validationErrMsg));

        errorCount++;

        // Quit if we exceed max allowable errors for this call.
        // We set lastRowHead = p, so that the code that follows this
        // loop won't try to save any data in fOverflowBuf.
        if (errorCount > allowedErrCntThisCall)
        {
          lastRowHead = p + 1;
          p++;
          break;
        }
      }

      curCol = 0;
      curFld = 0;
      lastRowHead = p + 1;
      rawDataRowLength = 0;

      // Resize fTokens array if we are about to fill it up
      if (curRowNum1 >= fTotalRows)
      {
        resizeTokenArray();
      }

      bNewLine = false;
      bValidRow = true;
      bRowGenAutoInc = false;
#ifdef DEBUG_TOKEN_PARSING

      if (initialState == FLD_PARSE_LEADING_CHAR_STATE)
        memset(enclosedFieldFlags, 0, sizeof(unsigned) * fNumberOfColumns);

#endif
    }  // end of (bNewLine)

    offset = 0;
    fieldState = initialState;
    enclosedFieldFlag = 0;

    p++;
  }  // end of (p < pEndOfData) loop to step thru the read buffer

  // Save any leftover data that we did not yet parse, into fOverflowBuf
  if (p > lastRowHead)
  {
    fOverflowSize = p - lastRowHead;
    fOverflowBuf = new char[fOverflowSize];

    // If we stripped out any chars, be sure to preserve the original data
    if (rawDataRowLength == 0)
      memcpy(fOverflowBuf, lastRowHead, fOverflowSize);
    else
      memcpy(fOverflowBuf, pRawDataRow, fOverflowSize);
  }
  else
  {
    fOverflowSize = 0;
    fOverflowBuf = NULL;
  }

  fTotalReadRows = curRowNum1;       // number of valid rows read
  fTotalReadRowsForLog = curRowNum;  // total number of rows read

  if (pRawDataRow)
    delete[] pRawDataRow;
}

//------------------------------------------------------------------------------
// Resize the fTokens array used to store meta data about the input read buffer.
// Used for initial allocation as well.
//------------------------------------------------------------------------------
void BulkLoadBuffer::resizeTokenArray()
{
  unsigned tmpTotalRows = 0;

  if (!fTokens)
  {
    tmpTotalRows = fBufferSize / 100;

    // Estimate the number of rows we can store in
    // one buffer by getting length of first record
    for (unsigned int k = 0; k < (fBufferSize - fOverflowSize); k++)
    {
      if (fData[k] == NEWLINE_CHAR)
      {
        tmpTotalRows = fBufferSize / (k + 1);
        break;
      }
    }
  }
  else
  {
    tmpTotalRows = (unsigned int)(fTotalRows * 1.25);

    // @bug 3478: Make sure token array is expanded.
    // If rows are loooong, then fTotalRows may be small (< 4), in which
    // a 1.25 factor won't increase the row count.  So this check is here
    // to make sure we increase the row count in this case.
    if (tmpTotalRows <= fTotalRows)
      tmpTotalRows = fTotalRows * 2;
  }

  if (fLog->isDebug(DEBUG_1))
  {
    std::string allocLabel("Re-Allocating");

    if (!fTokens)
      allocLabel = "Allocating";

    ostringstream oss;
    oss << "Table: " << fTableName << "; ReadBuffer: " << fBufferId << "; " << allocLabel
        << " ColValue metadata of size " << sizeof(ColPosPair) << " for " << tmpTotalRows << " rows and "
        << fNumberOfColumns << " columns ";
    fLog->logMsg(oss.str(), MSGLVL_INFO2);
  }

  ColPosPair** tmp;
  tmp = new ColPosPair*[tmpTotalRows];

  if (fTokens)
  {
    memcpy(tmp, fTokens, sizeof(ColPosPair*) * fTotalRows);
    delete[] fTokens;
  }

  fTokens = tmp;

  // Allocate a ColPosPair array for each new row
  for (unsigned i = fTotalRows; i < tmpTotalRows; ++i)
    fTokens[i] = new ColPosPair[fNumberOfColumns];

  fTotalRows = tmpTotalRows;
}

//@bug 5027: Add tokenizeBinary() and isBinaryFieldNull() for binary imports
//------------------------------------------------------------------------------
// Alternatve version of tokenize() uesd to import fixed length records in
// binary mode.
// Parse the rows of data in "fData", saving the meta information that describes
// the parsed data, in fTokens.  If the number of read parsing errors for a
// given call to tokenize() should exceed the value of "allowedErrCntThisCall",
// then tokenize() will stop reading data and exit.
//------------------------------------------------------------------------------
int BulkLoadBuffer::tokenizeBinary(const boost::ptr_vector<ColumnInfo>& columnsInfo,
                                   unsigned int allowedErrCntThisCall, bool bEndOfData)
{
  unsigned curCol = 0;           // dest db column counter within a row
  unsigned curRowNum = 0;        // "total" number of rows read during this call
  unsigned curRowNum1 = 0;       // number of "valid" rows inserted into fTokens
  char* p;                       // iterates thru each field in the input buffer
  char* lastRowHead = 0;         // start of latest row being processed
  bool bValidRow = true;         // track whether current row is valid
  bool bRowGenAutoInc = false;   // track whether row uses generated auto-inc
  std::string validationErrMsg;  // validation error msg (if any) for current row
  unsigned errorCount = 0;
  int rc = NO_ERROR;

  p = lastRowHead = fData;

  ldiv_t rowcnt = ldiv(fReadSize, fFixedBinaryRecLen);

  //--------------------------------------------------------------------------
  // Loop through all the bytes in the read buffer in order to construct
  // the meta data stored in fTokens.
  //--------------------------------------------------------------------------
  for (long kRow = 0; kRow < rowcnt.quot; kRow++)
  {
    //----------------------------------------------------------------------
    // Manage all the fields in a row
    //----------------------------------------------------------------------
    for (unsigned int curFld = 0; curFld < fNumFieldsInFile; curFld++)
    {
      if (fFieldList[curFld].fFldColType == BULK_FLDCOL_COLUMN_FIELD)
      {
        const JobColumn& jobCol = columnsInfo[curCol].column;

        if (curCol < fNumColsInFile)
        {
          fTokens[curRowNum1][curCol].start = p - fData;
          fTokens[curRowNum1][curCol].offset = jobCol.definedWidth;

          // Special auto-increment case; treat 0 as null value
          if (jobCol.autoIncFlag)
          {
            if (memcmp(p, &NULL_AUTO_INC_0_BINARY, jobCol.definedWidth) == 0)
            {
              fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
              bRowGenAutoInc = true;
            }
          }

          switch (jobCol.weType)
          {
            case WR_CHAR:
            {
              // Detect empty string for CHAR and VARCHAR
              if (*p == '\0')
                fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;

              break;
            }

            case WR_VARBINARY:
            {
              // Detect empty VARBINARY field
              int kk;

              for (kk = 0; kk < jobCol.definedWidth; kk++)
              {
                if (p[kk] != '\0')
                  break;
              }

              if (kk >= jobCol.definedWidth)
                fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;

              break;
            }

            default:
            {
              // In BinaryAcceptNULL mode, check for NULL value
              if ((fTokens[curRowNum1][curCol].offset != COLPOSPAIR_NULL_TOKEN_OFFSET) &&
                  (fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL))
              {
                if (isBinaryFieldNull(p, jobCol.weType, jobCol.dataType))
                {
                  fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;

                  if (jobCol.autoIncFlag)
                    bRowGenAutoInc = true;
                }
              }

              break;
            }
          }  // end of switch (jobCol.weType)

          // Validate NotNull column is supplied a value or a default
          if ((jobCol.fNotNull) && (fTokens[curRowNum1][curCol].offset == COLPOSPAIR_NULL_TOKEN_OFFSET) &&
              (!jobCol.fWithDefault) && (bValidRow))
          {
            bValidRow = false;

            ostringstream ossErrMsg;
            ossErrMsg << INPUT_ERROR_NULL_CONSTRAINT << "; field " << (curFld + 1);
            validationErrMsg = ossErrMsg.str();
          }
        }  // end "if (curCol < fNumColsInFile)"

        p += jobCol.definedWidth;
        curCol++;
      }
      else
      {
        // This is where we would handle <IgnoreField> fields
        // if they were supported in Binary Import mode
        // p += ?
      }
    }  // end of loop through fields in a row

    //----------------------------------------------------------------------
    // End-of-row processing
    //----------------------------------------------------------------------

    curRowNum++;  // increment total number of rows read

    if (bValidRow)
    {
      // Initialize fTokens for <DefaultColumn> tags not in input file
      if (fNumColsInFile < fNumberOfColumns)
      {
        for (unsigned int n = fNumColsInFile; n < fNumberOfColumns; n++)
        {
          fTokens[curRowNum1][n].start = 0;
          fTokens[curRowNum1][n].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;

          if (columnsInfo[n].column.autoIncFlag)
            bRowGenAutoInc = true;
        }
      }

      curRowNum1++;  // increment valid row count

      if (bRowGenAutoInc)
        fAutoIncGenCount++;  // update number of generated auto-incs
    }
    else
    {
      // Store validation error message to be logged
      string tmp(lastRowHead, fFixedBinaryRecLen);
      fErrRows.push_back(tmp);

      fRowStatus.push_back(std::pair<RID, std::string>(fStartRowForLogging + curRowNum, validationErrMsg));

      errorCount++;

      // Quit if we exceed max allowable errors for this call
      if (errorCount > allowedErrCntThisCall)
        break;
    }

    curCol = 0;
    lastRowHead += fFixedBinaryRecLen;

    // Resize fTokens array if we are about to fill it up
    if (curRowNum1 >= fTotalRows)
    {
      resizeTokenArray();
    }

    bValidRow = true;
    bRowGenAutoInc = false;
  }  // end of loop through the rows in the read buffer

  // Save any leftover data that we did not yet parse, into fOverflowBuf
  if (rowcnt.rem > 0)
  {
    if (bEndOfData)
    {
      rc = ERR_BULK_BINARY_PARTIAL_REC;
      ostringstream oss;
      oss << "Incomplete record (" << rowcnt.rem
          << " bytes) at end "
             "of import data; expected fixed length records of length "
          << fFixedBinaryRecLen << " bytes";
      fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
    }
    else
    {
      fOverflowSize = rowcnt.rem;
      fOverflowBuf = new char[fOverflowSize];

      memcpy(fOverflowBuf, (fData + fReadSize - rowcnt.rem), fOverflowSize);
    }
  }
  else
  {
    fOverflowSize = 0;
    fOverflowBuf = NULL;
  }

  fTotalReadRows = curRowNum1;       // number of valid rows read
  fTotalReadRowsForLog = curRowNum;  // total number of rows read

  return rc;
}

//------------------------------------------------------------------------------
// Compare the numeric value (val) against the relevant NULL value, based on
// column type (ct and dt), to see whether the specified value is NULL.
//------------------------------------------------------------------------------
bool BulkLoadBuffer::isBinaryFieldNull(void* val, WriteEngine::ColType ct,
                                       execplan::CalpontSystemCatalog::ColDataType dt)
{
  bool isNullFlag = false;

  switch (ct)
  {
    case WriteEngine::WR_BYTE:
    {
      if ((*(uint8_t*)val) == joblist::TINYINTNULL)
        isNullFlag = true;

      break;
    }

    case WriteEngine::WR_SHORT:
    {
      if ((*(uint16_t*)val) == joblist::SMALLINTNULL)
        isNullFlag = true;

      break;
    }

    case WriteEngine::WR_INT:
    {
      if (dt == execplan::CalpontSystemCatalog::DATE)
      {
        if ((*(uint32_t*)val) == joblist::DATENULL)
          isNullFlag = true;
      }
      else
      {
        if ((*(uint32_t*)val) == joblist::INTNULL)
          isNullFlag = true;
      }

      break;
    }

    case WriteEngine::WR_LONGLONG:
    {
      if (dt == execplan::CalpontSystemCatalog::DATETIME)
      {
        if ((*(uint64_t*)val) == joblist::DATETIMENULL)
          isNullFlag = true;
      }
      else if (dt == execplan::CalpontSystemCatalog::TIMESTAMP)
      {
        if ((*(uint64_t*)val) == joblist::TIMESTAMPNULL)
          isNullFlag = true;
      }
      else if (dt == execplan::CalpontSystemCatalog::TIME)
      {
        if ((*(uint64_t*)val) == joblist::TIMENULL)
          isNullFlag = true;
      }
      else
      {
        if ((*(uint64_t*)val) == joblist::BIGINTNULL)
          isNullFlag = true;
      }

      break;
    }

    case WriteEngine::WR_FLOAT:
    {
      if ((*(uint32_t*)val) == joblist::FLOATNULL)
        isNullFlag = true;

      break;
    }

    case WriteEngine::WR_DOUBLE:
    {
      if ((*(uint64_t*)val) == joblist::DOUBLENULL)
        isNullFlag = true;

      break;
    }

    // Detect empty string for CHAR and VARCHAR
    case WriteEngine::WR_CHAR:
    {
      // not applicable
      break;
    }

    // Detect empty VARBINARY field
    case WriteEngine::WR_VARBINARY:
    {
      // not applicable
      break;
    }

    case WriteEngine::WR_UBYTE:
    {
      if ((*(uint8_t*)val) == joblist::UTINYINTNULL)
        isNullFlag = true;

      break;
    }

    case WriteEngine::WR_USHORT:
    {
      if ((*(uint16_t*)val) == joblist::USMALLINTNULL)
        isNullFlag = true;

      break;
    }

    case WriteEngine::WR_UINT:
    {
      if ((*(uint32_t*)val) == joblist::UINTNULL)
        isNullFlag = true;

      break;
    }

    case WriteEngine::WR_ULONGLONG:
    {
      if ((*(uint64_t*)val) == joblist::UBIGINTNULL)
        isNullFlag = true;

      break;
    }

    case WriteEngine::WR_BINARY:
    {
      if ((*((int128_t*)val)) == datatypes::Decimal128Null)
        isNullFlag = true;

      break;
    }

    default:
    {
      break;
    }
  }

  return isNullFlag;
}

//------------------------------------------------------------------------------
// Sets the column status.
// returns TRUE if all columns in the buffer are complete.
//
// Note that fSyncUpdatesTI mutex is used to synchronize usage of fColumnLocks
// and fParseComplete from both read and parse threads.
//
// setColumnStatus() and tryAndLockColumn() formerly used fSyncUpdatesBLB mutex.
// But this seemed inconsistent because resetColumnLocks(), getColumnStatus(),
// and getColumnLocker() were not using this mutex.  In researching the idea of
// adding fSyncUpdatesBLB locks to these functions, I determined, that all the
// calls to the following functions were protected by a fSyncUpdatesTI mutex:
//   setColumnStatus()
//   tryAndLockColumn()
//   resetColumnLocks()
//   getColumnStatus()
//   getColumnLocker()
// So I added this note and removed the extraneous fSyncUpdatesBLB lock from
// setColumnStatus() and tryAndLockColumn().  (dmc-07/19/2009)
//------------------------------------------------------------------------------
bool BulkLoadBuffer::setColumnStatus(const int& columnId, const Status& status)
{
  fColumnLocks[columnId].status = status;

  if (status == WriteEngine::PARSE_COMPLETE)
    fParseComplete++;

  if (fParseComplete == fNumberOfColumns)
    return true;

  return false;
}

//------------------------------------------------------------------------------
// Note that fSyncUpdatesTI mutex is used to synchronize usage of fColumnLocks
// and fParseComplete from both read and parse threads.
//------------------------------------------------------------------------------
bool BulkLoadBuffer::tryAndLockColumn(const int& columnId, const int& id)
{
  if ((fColumnLocks[columnId].status != WriteEngine::PARSE_COMPLETE) && (fColumnLocks[columnId].locker == -1))
  {
    fColumnLocks[columnId].locker = id;
    return true;
  }

  return false;
}

}  // namespace WriteEngine
